Skip to content

VReplication: Implement Experimental Parallel Applier#19535

Open
mattlord wants to merge 95 commits into
mainfrom
vstream_parallel_replication
Open

VReplication: Implement Experimental Parallel Applier#19535
mattlord wants to merge 95 commits into
mainfrom
vstream_parallel_replication

Conversation

@mattlord

@mattlord mattlord commented Mar 2, 2026

Copy link
Copy Markdown
Member

Description

Warning

This feature is experimental

The parallel apply system enables VReplication to apply binlog events using multiple concurrent MySQL connections instead of a single serial connection. When --vreplication-parallel-replication-workers is set to N > 1, incoming transactions are analyzed for writeset conflicts and dispatched to N worker goroutines, each with its own MySQL connection. Transactions that touch different primary keys run concurrently; transactions that conflict are serialized.

The design is inspired by MySQL's own multi-threaded replica applier (MTA), specifically the WRITESET dependency tracking mode. We recompute writesets on the target side from the row events themselves, rather than relying on the source's binlog_transaction_dependency_tracking setting. This means the parallel applier works regardless of whether the source uses COMMIT_ORDER or WRITESET dependency tracking — or no dependency tracking at all. That is a critical property as we need to support importing databases of various versions and configurations into Vitess.

The design — most notably the in order commits of transactions (see more below) — also supports another critical property which is that it must live alongside the traditional serial applier/vplayer and provide the same external/visible semantics (the position and lag management, metrics, etc). This will be experimental for some time and it needs to be given time to bake before we can even consider fully replacing the serial applier code. We cannot risk destabilizing this critical component or primitive within VReplication because performance without correctness is worthless.

Goroutine Architecture

Four types of goroutines cooperate in a pipeline:

VStream → relayLog → [scheduleLoop] → applyScheduler → [workerLoop ×N] → commitCh → [commitLoop]
                          │                                                              │
                          └──── vp.serialMu (main connection) ───────────────────────────┘

1. scheduleLoop (single goroutine)

The schedule loop reads batched events from the relay log, parses them into logical transactions, builds writesets, and enqueues applyTxn structs into the scheduler. It runs on the main goroutine of applyEventsParallel.

Responsibilities:

  • Fetches event batches from the relay log via relay.Fetch()
  • Tracks transaction boundaries (GTID → events → COMMIT)
  • Extracts commit metadata (sequenceNumber, commitParent) from GTID events
  • Classifies transactions as row-only or mixed (DDL, OTHER, JOURNAL events make a transaction non-row-only; FIELD events are metadata and do not affect classification)
  • Builds PK-based writeset keys for row-only transactions
  • Handles transaction batching (merging consecutive transactions)
  • Handles empty transactions via unsavedEvent (bypasses the scheduler entirely)
  • Handles heartbeats inline (not enqueued through the scheduler)
  • Handles throttling by estimating lag and updating time_throttled

2. workerLoop (N goroutines)

Each worker goroutine calls scheduler.nextReady() to block until a transaction is ready to execute, then applies its row events using the worker's private MySQL connection.

Responsibilities:

  • Waits for the scheduler to declare a transaction ready
  • Builds a narrow worker-local view of the vplayer (workerLocalVPlayer) once per worker lifetime, exposing only the fields workers may share (tablePlans, replicatorPlan, serialMu, etc); foreign_key_checks session state is tracked per connection on the vdbClient, initialized when the worker's connections are created
  • Applies each event via worker.applyEvent(), which rebinds the worker's dbClient/query/commit onto that local view (the orchestrator's vplayer is never mutated)
  • On completion, attaches the worker's connection info to the payload and sends the transaction to commitCh
  • Blocks on txn.done until commitLoop finishes committing (this prevents the worker from reusing its connection while commitLoop is still writing to _vt.vreplication on it)
  • Worker connections run at READ COMMITTED: the writeset model covers PK/unique/FK conflicts but cannot see InnoDB gap/next-key locks, which REPEATABLE READ takes even for point operations on absent rows (e.g. a DELETE of a row that doesn't exist, or delete-marking in a non-unique secondary index). A later-ordered transaction's gap lock blocking an earlier-ordered transaction's INSERT would deadlock through the commitLoop's strict ordering — invisible to InnoDB's deadlock detector (MySQL's MTA has its Commit_order_manager for exactly this). RC takes no gap locks for row-image application and is MySQL's own recommendation for row-based parallel replicas; statement-based events force-serialize, so RC cannot change their outcome.

3. commitLoop (single goroutine)

The commit loop receives completed transactions from workers and commits them in strict order (by order field). This guarantees that the position saved to _vt.vreplication is monotonically increasing.

Responsibilities:

  • Maintains a pending map and nextOrder counter
  • Buffers out-of-order arrivals; commits in strict sequence
  • For worker transactions: writes the position update and COMMIT directly on the worker's connection (via the payload's client/query/commit) WITHOUT holding serialMu, so slow MySQL commits never block the scheduleLoop; only the brief vplayer bookkeeping afterwards takes serialMu
  • For commitOnly transactions (DDL, OTHER, JOURNAL, position saves): applies events and updates position on the main connection
  • Updates lag metrics after each commit
  • Signals the scheduler via markCommitted() to release inflight state
  • Returns io.EOF when stop position is reached

4. applyScheduler (shared state, not a goroutine)

The scheduler is a shared data structure protected by a mutex. It determines which transactions can execute concurrently based on writeset conflicts and transaction classification.

Transaction Classification

Every transaction enqueued to the scheduler is classified with these boolean flags:

Flag Meaning Set When
forceGlobal Must serialize with everything Non-row-only transactions (contain DDL, OTHER, or JOURNAL events), or copy phase, or writeset build failure. FIELD events are pure metadata and do NOT trigger forceGlobal.
hasCommitMeta Has MySQL commit dependency metadata sequenceNumber != 0 || commitParent != 0 in the GTID event
noConflict Always ready, bypasses all checks Position-only saves (unsaved event timeout)

Ready-Check Hierarchy (isReadyLocked)

The scheduler checks readiness in this order:

  1. noConflict → always ready. These are position-only saves that have no data and no side effects beyond updating _vt.vreplication. They bypass all conflict checking to prevent deadlocks where a position save with an earlier order is blocked by inflight data transactions.

  2. inflightGlobal > 0 → blocked. Any inflight global transaction blocks everything.

  3. forceGlobal → ready only when ALL inflight counters are zero (no inflight transactions of any kind).

  4. hasCommitMeta with inflightMissingMeta > 0 → blocked. Transactions with commit metadata cannot run alongside transactions lacking it (safety boundary).

  5. hasCommitMeta with non-empty writeset → ready if no inflight writeset conflicts. This is the key optimization: writeset-only conflict detection, skipping the commit-parent dependency check entirely. This allows parallelism even when the source is not using WRITESET based dependency tracking, which would otherwise produce a strict serial chain.

  6. hasCommitMeta with empty writeset → falls back to commit-parent ordering (commitParent <= lastCommittedSequence).

  7. No commit metadata, no writeset → treated as global (increments inflightGlobal).

  8. No commit metadata, has writeset → checks writeset conflicts + must wait if inflightCommitMeta > 0.

Inflight Tracking

Four counters track what's currently being applied:

Counter Tracks
inflightGlobal forceGlobal transactions + no-meta-no-writeset transactions
inflightMissingMeta All transactions without commit metadata
inflightCommitMeta Transactions with commit metadata
inflightWriteset Per-key reference counts for writeset conflict detection

Writeset Conflict Detection

PK-Based Keys

For each row change (INSERT, UPDATE, DELETE), the writeset extractor hashes the table name and primary key values into a uint64 using xxhash (XXH64). Conceptually the key represents tableName:pk1,pk2,..., but using fixed-size hashes instead of heap-allocated strings eliminates the dominant per-transaction allocation source at high TPS. Both the before-image and after-image are hashed because an UPDATE that changes a PK value must conflict with both the old and new key.

The PK column indices are stored in TablePlan.PKIndices (a []bool where true at index i means column i is part of the PK). This is populated when the replicator plan is built from FIELD events.

Unique-Key Keys

Unique secondary indexes make transactions on different rows order-dependent: one transaction frees a unique value and another claims it, so PK keys alone would schedule the pair in parallel and the second to apply would hit a duplicate-key error. Mirroring MySQL's WRITESET dependency tracking — which hashes every unique key, not just the PK — the writeset extractor also emits a key per hashable unique secondary index for both row images, with the index ordinal folded into the digest so different indexes on the same table occupy distinct key spaces. A NULL in any key column emits no key for that image (MySQL unique indexes permit multiple NULLs, so a NULL-valued key cannot conflict with anything). Only transactions actually colliding on a unique value serialize against each other; everything else stays parallel.

FK-Aware Keys

When foreign key constraints exist, a transaction that modifies a child table row must serialize with transactions that modify the referenced parent row. At startup, the parallel applier queries information_schema.KEY_COLUMN_USAGE to discover all FK constraints. For each child row change, it hashes the parent table name and FK column values into a uint64 — using the same xxhash scheme as PK keys, so the hash will match the parent table's PK-based writeset hash. This forces the scheduler to see a conflict between child and parent operations.

Copy-on-Write Table Plan Snapshot

The scheduleLoop needs to read tablePlans to build writesets, but the serial applier path mutates tablePlans when FIELD events arrive. To avoid holding a read lock across writeset computation, we use a copy-on-write snapshot: snapshotTablePlans copies the map only when tablePlansVersion has changed since the last snapshot. FIELD events increment the version.

Serialization Escapes (fail closed)

Whenever the writeset hasher cannot prove the absence of a conflict, the transaction is routed to the serial path (forceGlobal) rather than guessed at:

  • Unhashable unique indexes: prefix-length and expression/functional unique indexes enforce uniqueness over a derived value the hasher cannot reproduce (a full-value hash would miss conflicts), and a PK that doesn't match the replication identity breaks the keys-imply-identity reasoning — tables carrying these force-serialize. Plain-column unique secondaries are handled by unique-key hashing (see Unique-Key Keys above) instead of serialization.
  • Unsupported column mappings: plans that rewrite, project, or reorder columns (expressions, generated columns, lossy casts) produce hash inputs that don't correspond 1:1 to the row image, so they serialize.
  • Partial row images (DataColumns/JsonPartialValues, noblob -1 lengths on relevant columns): omitted columns can shift values into wrong field slots, so they serialize.
  • No usable identity: a plan with no PK/identity columns contributes an error (not silently zero keys) so the txn serializes instead of racing untracked.
  • DDL: after an executed DDL, FK metadata is re-queried from information_schema and a per-table stale-plan barrier force-serializes transactions touching affected tables until their refreshed FIELD event replaces the cached plan.

Transaction Batching

The schedule loop merges consecutive transactions in the same relay log fetch into a single larger transaction, mirroring the serial applier's hasAnotherCommit lookahead. This reduces the number of MySQL COMMITs.

Batching rules:

  • If another COMMIT exists ahead in the current relay batch and we don't need to force-save, skip the flush and let events accumulate
  • Time-bounded at 500ms: during catch-up, heartbeats may not arrive to force a flush, so we force one every 500ms to keep time_updated and lag metrics fresh
  • Skipped when FK refs exist: large batches merge many parent/child operations into a single writeset, causing nearly all batches to conflict on FK ref keys and effectively serializing everything. Flushing each source transaction individually lets the scheduler detect truly independent transactions.

When batching merges a transaction, its sequenceNumber is advanced via scheduler.advanceCommittedSequence() so that future transactions whose commitParent references the merged-away sequence are not blocked forever.

Empty Transaction Handling

Empty transactions (GTID → BEGIN → COMMIT with no row events) are very common (from filtered-out tables on the same source shard). They bypass the scheduler entirely:

  • The COMMIT is stored as vp.unsavedEvent
  • If idleTimeout (1 second) passes with no real commits, the position is saved as a noConflict commit-only transaction via enqueueCommitOnly
  • The scheduler's lastCommittedSequence is advanced for the empty transaction's sequence number, preventing dependent transactions from being blocked

During catch-up, empty transactions can arrive in a continuous stream, preventing the idle timeout from firing. A separate lastHeartbeatRefresh timer periodically updates time_updated directly via SQL to keep max_v_replication_lag fresh.

Commit Ordering

Commits must be strictly ordered because _vt.vreplication stores a single position, and that position must only move forward. The commitLoop achieves this with:

  1. Each transaction gets a monotonically increasing order number (assigned by scheduleLoop)
  2. The commitLoop maintains nextOrder starting at 1
  3. Transactions arrive out of order via commitCh; they're buffered in a pending map
  4. Transactions are committed strictly in order: nextOrder is incremented after each commit
  5. After committing, markCommitted() releases the transaction's inflight state in the scheduler, potentially unblocking waiting transactions (a released multi-key writeset can ready several pending transactions; dispatched workers pass the wakeup baton so all of them start without waiting for the next commit)

End-to-end backpressure: the scheduler caps outstanding ordered work at roughly one applying transaction per worker plus the commit buffer (maxOutstandingOrders ≈ 5× workers), so a commitLoop stalled on an early order bounds how far the pipeline can run ahead of durable progress.

Worker Transaction Commit Protocol

When committing a worker's transaction, the commitLoop (without holding serialMu for any of the MySQL work):

  1. Writes the position update on the worker's connection via updatePosWithoutStop — inside the worker's open MySQL transaction (in batch mode this rides the same multi-statement flush as the COMMIT)
  2. If the stop position was reached, writes the Stopped state via setStopPositionStateImmediate on the same worker connection, inside the same transaction — so position, row changes, and stop state commit atomically and no cross-connection lock ordering exists
  3. Commits the worker's MySQL transaction
  4. Briefly takes serialMu to update vplayer bookkeeping (recordPositionSave, pending FIELD-refresh counters)
  5. Calls markCommitted() to release the transaction's inflight state in the scheduler
  6. Sends on txn.done (a buffered channel) to let the worker reuse the connection; returns io.EOF if the stop position was reached

Startup and Configuration

Flag: --vreplication-parallel-replication-workers N

  • Default: 1 (serial apply, no parallelism)
  • Set to N > 1 to enable parallel apply with N worker goroutines; capped at 64 (the per-workflow override rejects larger values, the tablet flag clamps with a warning)
  • Each worker holds two MySQL connections (double-buffered apply), so a workflow uses 2N+2 connections total
  • Can also be set per-workflow via the vreplication-parallel-replication-workers config override in the workflow's options column

Activation path:

  1. vreplicator.replicate() creates a vplayer and calls vp.play()vp.fetchAndApply()
  2. fetchAndApply checks ParallelReplicationWorkers > 1 and len(copyState) == 0
  3. If both conditions are met, calls vp.applyEventsParallel() instead of vp.applyEvents()
  4. The parallel applier creates N applyWorker instances, each with its own filtered vdbClient MySQL connection
  5. FK constraints are queried from information_schema at startup

Parallel apply is only active during the replication (running) phase, not during the copy phase. During copy, the serial applier is always used.

Shutdown Protocol

applyEventsParallel orchestrates a clean shutdown sequence:

  1. scheduleLoop returns (either from error, context cancellation, or relay log EOF)
  2. scheduler.close() is called — broadcasts to wake all blocked workers
  3. wg.Wait() — waits for all worker goroutines to exit
  4. close(commitCh) — signals commitLoop that no more transactions will arrive
  5. <-commitDone — waits for commitLoop to drain remaining buffered transactions
  6. Final error is determined: applyErr (from commitLoop/scheduleLoop) takes priority over workerErr
  7. io.EOF and context.Canceled are converted to nil (the caller treats nil as a clean stop)

Relationship to MySQL Multi-Threaded Applier

MySQL's multi-threaded replica applier (MTA) has three dependency tracking modes:

Mode How It Works
COMMIT_ORDER Transactions can only execute in parallel if they were committed in parallel on the source (same commit parent). In practice, this often serializes everything.
WRITESET Transactions with non-conflicting writesets can run in parallel regardless of commit order. MySQL tracks writesets in the binary log via binlog_transaction_dependency_tracking.
WRITESET_SESSION Like WRITESET, but transactions from the same session are additionally serialized.

The Vitess parallel applier most closely resembles WRITESET mode, but with key differences:

  1. Writesets are computed on the target, not taken from the source binlog. This means parallelism is available regardless of the source's binlog_transaction_dependency_tracking setting.

  2. Commit metadata is used as a fallback, not as the primary mechanism. When a writeset can be computed (non-empty PK indices, no errors), writeset-only conflict detection is used. The commitParent field is only used when the writeset is empty (build failure or no row events).

  3. FK awareness is built in. MySQL's MTA does not consider FK constraints in its writeset tracking. The Vitess parallel applier queries information_schema.KEY_COLUMN_USAGE at startup and generates additional writeset keys that create conflicts between child and parent table operations.

  4. Commit ordering is strict. Unlike MySQL's MTA which can commit out of order in WRITESET mode (and reorder via the slave_preserve_commit_order setting), the Vitess parallel applier always commits in order. This simplifies position tracking (single position in _vt.vreplication) and avoids gaps in the position that could confuse WaitForPos or other external observers.

Key Design Trade-offs

Strict commit ordering vs. throughput

Committing in strict order means a slow transaction can stall all commits behind it, even if their MySQL work is done. We accept this because:

  • Position tracking requires monotonic progress
  • WaitForPos and monitoring tools expect a single consistent position
  • The commitLoop itself is fast (just UPDATE _vt.vreplication + COMMIT); the bottleneck is the workers' apply time

Writeset-only detection vs. commit-parent chains

By ignoring the source's commit-parent chain when a valid writeset exists, we achieve parallelism even when the source uses COMMIT_ORDER. The trade-off is that we must build writesets ourselves, which adds CPU overhead in the scheduleLoop. In practice, writeset computation is cheap (xxhash digests of PK values with no heap allocations).

FK batching trade-off

Without FK constraints, transaction batching reduces MySQL COMMIT overhead. With FK constraints, batching merges independent parent/child operations into single large writesets that always conflict, destroying parallelism. The solution is to skip batching when FK refs are present, accepting more frequent COMMITs in exchange for actual parallelism.

Head-of-line blocking in the pending queue

popReadyLocked stops scanning at the first non-ready, non-noConflict transaction. This prevents dispatching a later transaction whose inflight state could block the earlier one from ever becoming ready — which would deadlock with the commitLoop's strict ordering. The trade-off is that a single blocked transaction at the head of the queue stalls all transactions behind it, even non-conflicting ones. This is a correctness-over-throughput choice.

Per-worker vplayer view

Each worker builds a narrow vplayer value once at startup via workerLocalVPlayer, containing only the fields workers are allowed to share (tablePlans + its mutex/version, replicatorPlan, serialMu, etc). This gives each worker its own dbClient/query/commit bindings without fine-grained locking, while structurally preventing workers from reaching into main-goroutine-owned vplayer state. The shared mutable fields on vplayer are pointers (*sync.Mutex, *sync.RWMutex, *atomic.Int64, *atomic.Pointer) so the view and the orchestrator's vplayer alias the same synchronization state; per-session state like foreign_key_checks lives on the vdbClient (per connection) instead of the vplayer. Note this makes "vplayer is safely copyable" a load-bearing property: new mutable non-pointer fields must not be added without considering the worker view.

Source Files

File Purpose
parallel_apply.go Main orchestration: applyEventsParallel, scheduleLoop, scheduleItems, workerLoop, commitLoop, sync.Pool, lag computation, post-DDL stale-plan barriers
parallel_apply_scheduler.go applyScheduler: enqueue, nextReady, isReadyLocked, inflight tracking, pending queue management
parallel_apply_worker.go applyWorker: connection setup, applyEvent (field swapping on vplayer copy)
parallel_apply_writeset.go buildTxnWriteset, writesetKeysForChange, writesetKeysForFKRef, snapshotTablePlans, queryFKRefs

Benchmarks

Benchmark Suite

A local benchmark suite is included in examples/benchmark/ to measure parallel applier throughput in isolation. The suite consists of:

Script Purpose
bench_setup.sh Brings up commerce + customer keyspaces with configurable PARALLEL_WORKERS
bench_run.sh Runs one benchmark iteration (seed → copy → stop → backlog → drain → validate)
bench_generate_load.sh Generates deterministic mixed workloads via Python and pipes them through vtgate
bench_compare.sh A/B comparison: runs serial (workers=1) then parallel (workers=4) and reports results
create_bench_schema.sql Schema for 4 benchmark tables (bench_orders, bench_events, bench_accounts, bench_logs)
vschema_bench.json VSchema for the benchmark tables

Methodology

The benchmark uses a pause-load-resume approach to isolate applier throughput from vstreamer rate:

  1. Setup: Create a MoveTables workflow from commercecustomer and let the copy phase complete
  2. Stop: Pause the workflow so no replication occurs
  3. Index: Add ~25 secondary indexes per table on the target to create expensive index maintenance overhead during apply — this ensures each statement does meaningful I/O work
  4. Backlog: Generate 200K mixed operations (50% INSERT, 20% UPDATE, 5% DELETE, 15% bulk UPDATE, 10% bulk DELETE) on the source while the workflow is stopped, using a fixed random seed for deterministic, repeatable workloads
  5. Drain: Start the workflow and measure wall-clock time until the target's GTID position catches up to the source's GTID position
  6. Validate: Verify row counts match between source and target for all 4 tables

Timing is GTID-based, not lag-based. The benchmark captures the source's GTID sequence number after generating the backlog, then polls the target's GTID sequence until it reaches or exceeds the source. This provides an exact measurement of when all backlog transactions have been applied.

Configuration

The benchmark tunes MySQL to remove fsync as a variable and isolate the applier:

  • 32MB InnoDB buffer pool on target tablets (with innodb_buffer_pool_chunk_size=1M to allow sub-128MB sizing)
  • innodb_flush_log_at_trx_commit=0, sync_binlog=0 on all tablets
  • innodb_change_buffering=none on target tablets (forces immediate B-tree page reads on every DML)
  • durability-policy=none (no semi-sync)
  • Default relay log size (250KB / 5000 items)
  • VReplication experimental flags=13 (OptimizeInserts + VPlayerBatching + AllowNoBlobBinlogRowImage)

Results

Configuration Serial (1 worker) Parallel (4 workers) Speedup
25 indexes, 32MB buffer pool, default relay log 489s, 408 ops/sec 284s, 704 ops/sec 1.72x

Row count validation: PASS (all 4 tables match source ↔ target).

The numbers above reflect a representative single A/B run on the same hardware. Across 10+ iterations of the same benchmark, individual drain times vary (parallel: 283–330s, 606–706 ops/sec; serial: 419–521s, 383–477 ops/sec), with single-run speedup ratios ranging from ~1.4x to 1.7x. The parallel applier consistently hits 600+ ops/sec and 280–330s drain; serial throughput is more sensitive to OS-level page cache and disk I/O variance between runs, which is the dominant source of speedup-ratio variance.

Key Findings

  • The relay log size is critical. Large relay logs (250MB / 500K items) let the serial applier build massive mega-transactions — merging all ~194K source transactions into a single MySQL transaction with one COMMIT. This amortizes per-commit overhead so effectively that the serial applier outperforms the parallel applier. Default relay log sizes (250KB / 5000 items) limit serial batches to ~200 source transactions per commit, which exposes the applier bottleneck and allows parallelism to help.

  • Buffer pool sizing matters for parallel workers. With an 8MB buffer pool and 4 workers, each worker effectively has 2MB of InnoDB buffer pool, causing destructive cache thrashing between workers. Sizing the buffer pool so each worker gets at least as much cache as the serial applier gets total (32MB / 4 = 8MB per worker) eliminates this problem.

  • FIELD event handling affects parallelism. FIELD events (table metadata) previously fell through to the default case in scheduleItems, setting curRowOnly=false and causing ~50% of transactions to be forceGlobal=true — effectively serializing the scheduler. Adding an explicit FIELD event handler reduced forceGlobal to ~1.8% (only at startup when table plans haven't been applied yet).

  • Transaction batching (maxBatchedCommits) reduces per-commit overhead. Setting maxBatchedCommits = workerCount * 4 (=16 for 4 workers) merges multiple source transactions into each mega-transaction, reducing the number of MySQL COMMITs + position updates + scheduler dispatches by 16x while still producing enough independent mega-transactions to keep all workers busy.

Related Issue(s)

Checklist

  • "Backport to:" labels have been added if this change should be back-ported to release branches
  • If this change is to be back-ported to previous releases, a justification is included in the PR description
  • Tests were added or are not required
  • Did the new or modified tests pass consistently locally and on CI?
  • Documentation was added or is not required

AI Disclosure

I worked with OpenCode + Codex-5.2 and Opus 4.6 + Copilot on this.

This is modeled after MySQL's parallel worker
implementation and the default/serial vplayer/vreplicator
implementation in Vitess.

Signed-off-by: Matt Lord <mattalord@gmail.com>
@github-actions github-actions Bot added this to the v24.0.0 milestone Mar 2, 2026
@vitess-bot

vitess-bot Bot commented Mar 2, 2026

Copy link
Copy Markdown
Contributor

Review Checklist

Hello reviewers! 👋 Please follow this checklist when reviewing this Pull Request.

General

  • Ensure that the Pull Request has a descriptive title.
  • Ensure there is a link to an issue (except for internal cleanup and flaky test fixes), new features should have an RFC that documents use cases and test cases.

Tests

  • Bug fixes should have at least one unit or end-to-end test, enhancement and new features should have a sufficient number of tests.

Documentation

  • Apply the release notes (needs details) label if users need to know about this change.
  • New features should be documented.
  • There should be some code comments as to why things are implemented the way they are.
  • There should be a comment at the top of each new or modified test to explain what the test does.

New flags

  • Is this flag really necessary?
  • Flag names must be clear and intuitive, use dashes (-), and have a clear help text.

If a workflow is added or modified:

  • Each item in Jobs should be named in order to mark it as required.
  • If the workflow needs to be marked as required, the maintainer team must be notified.

Backward compatibility

  • Protobuf changes should be wire-compatible.
  • Changes to _vt tables and RPCs need to be backward compatible.
  • RPC changes should be compatible with vitess-operator
  • If a flag is removed, then it should also be removed from vitess-operator and arewefastyet, if used there.
  • vtctl command output order should be stable and awk-able.

@vitess-bot vitess-bot Bot added NeedsWebsiteDocsUpdate What it says NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsIssue A linked issue is missing for this Pull Request NeedsBackportReason If backport labels have been applied to a PR, a justification is required labels Mar 2, 2026
@mattlord mattlord added Type: Feature Component: VReplication NeedsWebsiteDocsUpdate What it says and removed NeedsDescriptionUpdate The description is not clear or comprehensive enough, and needs work NeedsWebsiteDocsUpdate What it says NeedsIssue A linked issue is missing for this Pull Request NeedsBackportReason If backport labels have been applied to a PR, a justification is required labels Mar 2, 2026
@mattlord mattlord changed the title VReplication: Implement Parallel Applier VReplication: Implement Experimental Parallel Applier Mar 2, 2026
@mattlord mattlord changed the title VReplication: Implement Experimental Parallel Applier VReplication: Implement Parallel Applier Mar 2, 2026
Signed-off-by: Matt Lord <mattalord@gmail.com>
@mattlord mattlord force-pushed the vstream_parallel_replication branch from bef2cfc to 1092f25 Compare March 2, 2026 01:33
@codecov

codecov Bot commented Mar 2, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 87.50438% with 357 lines in your changes missing coverage. Please review.
✅ Project coverage is 62.04%. Comparing base (70c7a72) to head (4fc1b73).
⚠️ Report is 323 commits behind head on main.

Files with missing lines Patch % Lines
...ablet/tabletmanager/vreplication/parallel_apply.go 86.29% 172 Missing ⚠️
...letmanager/vreplication/parallel_apply_writeset.go 89.48% 67 Missing ⚠️
go/vt/vttablet/tabletserver/vstreamer/vstreamer.go 69.07% 30 Missing ⚠️
.../vt/vttablet/tabletmanager/vreplication/vplayer.go 88.63% 20 Missing ⚠️
...vttablet/tabletmanager/vreplication/vreplicator.go 86.33% 19 Missing ⚠️
...blet/tabletmanager/vreplication/replicator_plan.go 71.42% 14 Missing ⚠️
go/vt/vttablet/onlineddl/executor.go 73.68% 10 Missing ⚠️
...abletmanager/vreplication/parallel_apply_worker.go 91.17% 9 Missing ⚠️
...etmanager/vreplication/parallel_apply_scheduler.go 96.86% 7 Missing ⚠️
go/vt/binlog/binlog_streamer.go 72.22% 5 Missing ⚠️
... and 2 more

❗ There is a different number of reports uploaded between BASE (70c7a72) and HEAD (4fc1b73). Click for more details.

HEAD has 1 upload less than BASE
Flag BASE (70c7a72) HEAD (4fc1b73)
1 0
Additional details and impacted files
@@             Coverage Diff             @@
##             main   #19535       +/-   ##
===========================================
- Coverage   69.67%   62.04%    -7.63%     
===========================================
  Files        1614      120     -1494     
  Lines      216793    23137   -193656     
===========================================
- Hits       151044    14355   -136689     
+ Misses      65749     8782    -56967     
Flag Coverage Δ
partial 62.04% <87.50%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Signed-off-by: Matt Lord <mattalord@gmail.com>
@mattlord mattlord force-pushed the vstream_parallel_replication branch from b97160e to 5ab9591 Compare March 2, 2026 02:10
mattlord added 4 commits March 2, 2026 02:11
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>
@mattlord mattlord force-pushed the vstream_parallel_replication branch from 3635e78 to 90a5739 Compare March 2, 2026 15:11
Signed-off-by: Matt Lord <mattalord@gmail.com>
@mattlord mattlord force-pushed the vstream_parallel_replication branch from 8b8af9e to c3ea987 Compare March 2, 2026 17:55
mattlord added 2 commits March 2, 2026 18:14
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 76 out of 78 changed files in this pull request and generated no new comments.

mattlord added 2 commits May 15, 2026 12:26
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 78 out of 80 changed files in this pull request and generated no new comments.

@promptless

promptless Bot commented May 15, 2026

Copy link
Copy Markdown
Contributor

📝 Documentation updates suggested

I've created documentation updates for this PR:

vitessio/website - VReplication parallel applier docs:

  • Added --vreplication-parallel-replication-workers flag to VReplication flags reference
  • Added "Parallel Applier (Experimental)" section to VReplication overview
  • Includes experimental warnings, tuning guidance, and --config-overrides examples

vitessio/vitess - Changelog entry:

  • Added changelog entry for v26.0.0 documenting the new experimental feature
  • Covers flag usage, writeset conflict detection, commit ordering, and FK awareness

🤖 Generated by Promptless

@mhamza15 mhamza15 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Left just a few comments. The overall design looks sound. One thought I had: to my knowledge, if the current transaction is blocked by a dependency on an in-flight one, future transactions will be blocked right until the current one is unblocked right? Something like:

TX 1: row A  // <-- in-flight
TX 2: row A // <-- head of the queue, has a dependency on TX 1, will wait
TX 3: row B // <-- blocked, even though it is not dependent on any in-flight or previous transactions in the queue

If we track each transaction's dependencies, we could allow TX 3 to run in parallel, no?

Comment thread examples/benchmark/bin/zksrv.sh Outdated
Comment thread go/vt/vttablet/tabletmanager/vreplication/parallel_apply.go
Comment thread go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Comment thread go/vt/vttablet/tabletmanager/vreplication/vplayer.go Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 58 out of 59 changed files in this pull request and generated 1 comment.

Comment thread go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Signed-off-by: Matt Lord <mattalord@gmail.com>

@arthurschreiber arthurschreiber left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me! ❤️

Here's a few things I noticed while reviewing this and trying it locally:


I think there's a bug in the benchmarks that you provided. The benchmark waits for _vt.vreplication.state == 'Running' to be true before stopping the workflow and generating a backlog. It seems that this signal is not actually accurate, which then causes the backlog to be applied by the copy phase.

I think a better check would be to verify the per-table target row count matches source row count for all tables as the signal for the copy phase to have finished.


In my testing (MacBook Pro with M4 Pro), MySQL was consistently the bottleneck, not the parallel vapplier code. I think it would be great if we could run benchmarks on a more powerful machine to see how the new applier scales when not limited by MySQL.

There seem to be a few inefficiencies which don't show up in the benchmarks (again, being limited by MySQL), but might still be good to resolve, because inefficiencies in the vapplier can still cause inefficiencies in other parts of vttablet (e.g. through higher gc pressure etc).

I left comments on the relevant pieces that are part of this code.


There's two allocation specific things I noticed, but they're both not trivial to fix and are not really introduced by this PR - so I'm just writing this down with no expectation of this being fixed here:

  • a lot of allocations are caused by decoding the incoming binlog event stream. That's a more general grpc / protobuf issue we have also in other places, and I believe it's not easily fixable here because the lifetime of the objects is unclear.

  • the other large contributor to allocations I saw was the creation of strings.Builder structs in ParsedQuery.GenerateQuery. There is a ParsedQuery.Append function that allows re-using a strings.Builder, but the harder question to answer is where those builders should live - there's the non-batch, the batched, and the new parallel worker paths that all need to be taken into account.


One more thing I just noticed, which might be simpler to implement:

applyChange in replicator_plan.go generates a per-row bindvar map, and re-computes the before / after field names. The binder map should be reusable (clear it between uses), and the field names should also be static. But I'm also not sure how much this actually costs given that, as pointed out before, MySQL seems to be the bottleneck anyway.

if row == nil {
return false
}
relevantColumns := make(map[int]struct{})

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

relevantColumns here seems to be rebuild on every call, and is called twice per row. Isn't the content of this stable across all the rows? If so, we probably should compute this once and reuse the result.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, I think we can iterate plan.PKIndices and the FK columns directly against row.Lengths and check for < 0 without allocating anything at all.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep — it's now cached per table (the same pattern as fieldIdxCache), built once per table per fetch instead of per row change; addressed in 21f86d3. I looked at the fully alloc-free direct iteration too, but the FK-joined columns need a name→index resolution, so the cached set keeps the per-row path to a single Lengths scan.

fieldIdx[strings.ToLower(f.Name)] = i
}
}
indexes := make([]int, 0, len(plan.IdentityColumns))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The list of indexes on the table should be stable. I think we should be able to pre-compute it, instead of re-computing it for every row.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Similar to how fieldIdxCache works).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 4589d8d — the identity column positions are now resolved once per table and cached alongside fieldIdx, and the per-change path takes them pre-resolved.

Comment on lines +119 to +122
payload := make([]byte, len(writesetTextValueMarker)+len(scratch))
copy(payload, writesetTextValueMarker[:])
copy(payload[len(writesetTextValueMarker):], scratch[:])
writesetDigestAddPayload(d, payload)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be optimized to not require allocating payload here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fun one: the allocation benchmark shows escape analysis was already keeping this on the stack — identical 1 alloc/op before and after (that remaining alloc is the xxhash.Digest escaping in the bench harness). 4589d8d switches it to a fixed-size array anyway so the no-alloc property is structural rather than escape-analysis-dependent.

func writesetDigestInit(d *xxhash.Digest, tableName string) {
d.Reset()
d.WriteString(tableName)
d.Write([]byte{':'})

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract []byte{':'} into a const?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go won't let us make a []byte const, but 4589d8d extracts it to a package-level array (writesetKeySeparator) — named and guaranteed alloc-free.

…ication

Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 58 out of 59 changed files in this pull request and generated no new comments.

Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 55 out of 56 changed files in this pull request and generated 1 comment.

Comment thread go/vt/vttablet/tabletmanager/vreplication/controller.go
mattlord added 2 commits June 12, 2026 20:11
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 55 out of 56 changed files in this pull request and generated no new comments.

mattlord added 2 commits June 12, 2026 22:49
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 56 out of 57 changed files in this pull request and generated 1 comment.


streamErr := errors.New("stream ended after buffered events")
cp := dbconfigs.New(&mysql.ConnParams{DbName: testenv.DBName})
vse := &Engine{keyspace: testenv.DBName, shard: testenv.DefaultShard, throttledCounts: stats.NewCounter("", "")}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one's safe by design: throttle.Client.ThrottleCheckOK nil-checks its receiver (client.go — if c == nil { return emptyCheckResult, true }), so a bare Engine's nil throttlerClient short-circuits to "not throttled" rather than panicking — which is why this test passes deterministically, including under -race. The other throttle tests initialize throttlerClient because they want throttling (via TestingAlwaysThrottledName), not because nil panics. Added a comment on the fixture so this doesn't keep getting flagged.

mattlord added 2 commits June 13, 2026 00:17
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 56 out of 57 changed files in this pull request and generated 1 comment.

Comment thread go/vt/vttablet/tabletmanager/vreplication/parallel_apply_scheduler.go Outdated
mattlord added 2 commits June 13, 2026 01:56
Signed-off-by: Matt Lord <mattalord@gmail.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 56 out of 57 changed files in this pull request and generated no new comments.

Signed-off-by: Matt Lord <mattalord@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants